This notebook is almost copy/paste from the first part of: http://nbviewer.ipython.org/github/vals/scilife-python-course/blob/master/parallel%20python.ipynb
multiprocessing
The built in module multiprocessing
provides functionality to create processes which runs given tasks.
http://docs.python.org/2/library/multiprocessing.html
All strategies for paralleliztion has a rathe large overhead compared to lower level languages such as C or FORTRAN.
The way multiprocessing
runs code in parallel is by launching subprocesses with a seperate interpretor for for each process. This means that in order to gain speed the computation we want to perform should be relatively substantial.
(In case you are familiar with threads: It should be noted that Python has a threading
module for working with threads, however, all threads will be run on a single CPU.)
Byt using multiprocessing we can utilize the machines we are running code on more efficiently
In [1]:
import multiprocessing
In [2]:
multiprocessing.cpu_count()
Out[2]:
Before talking about some more advanced featuers, let's describe the most typical use pattern of multiprocessing
.
Note: multiprocessing
can be used in the IPython Notebook, but there are sometimes issues with printing from subprocesses. To make things clearer and avoid complications we shall run external scripts in stead.
Process
Processes share nothing
To spawn a process, initiate it with a target function and call the .start()
method.
This method will arrange things so that given code will be run in a seperate process from the parent process. To get the parent process to wait until a process has finished before moving on one need to call the .join()
method.
In [3]:
import os
os.getpid()
Out[3]:
In [4]:
%%file mp.py
from multiprocessing import Process
import os
def worker():
print("Worker process {}".format(os.getpid()))
if __name__ == "__main__":
proc1 = Process(target=worker)
proc1.start()
proc2 = Process(target=worker)
proc2.start()
In [4]:
import subprocess
def run(script_file):
print subprocess.Popen('python %s' % script_file, stdout=subprocess.PIPE).communicate()[0]
In [5]:
run('mp.py')
To get the target function to actually work on some input, you need to provide the arguments in the constructur of the Process
.
In [23]:
%%file mp.py
from multiprocessing import Process
import os
def worker(arg):
print("Worker process {}, argument was {}".format(os.getpid(), arg))
if __name__ == "__main__":
proc1 = Process(target=worker, args=(10,))
proc1.start()
proc2 = Process(target=worker, args=(11,))
proc2.start()
In [24]:
run('mp.py')
Processes communicate over interprocess communication channel
Queue
Pipe
In [25]:
%%file mp2.py
from multiprocessing import Process, Queue
import os
def worker(tasks, results):
t = tasks.get()
result = t * 2
results.put([os.getpid(), t, "->", result])
if __name__ == "__main__":
n = 20
my_tasks = Queue()
my_results = Queue()
workers = [Process(target=worker, args=(my_tasks, my_results)) for i in range(n)]
for proc in workers:
proc.start()
for i in range(n):
my_tasks.put(i)
for i in range(n):
result = my_results.get()
print(result)
In [26]:
run('mp2.py')
Because the processes are executed in parallel we can never know the order of results being put in the Queue
.
In [27]:
from multiprocessing import Queue
In [28]:
q = Queue()
In [29]:
q.get?
In [42]:
%%file mp3.py
from multiprocessing import Process, Manager
import os
def worker(l):
p = os.getpid()
#l[int(str(p)[-1:])] = p
print p
l[int(str(p)[-1:])]+=1
if __name__ == "__main__":
n = 10
manager = Manager()
l = manager.list()
l.extend([0] * n)
processes = [Process(target=worker, args=(l,)) for i in range(20)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
print(l)
print sum(l)
In [43]:
run('mp3.py')
In [44]:
%%file mp.py
import multiprocessing
import os
def task(args):
print "Running process", os.getpid(), "with args", args
return os.getpid(), args
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = pool.map(task, [1,2,3,4]*3)
print(result)
In [45]:
run('mp.py')
The method .map()
works like the built in function map()
, but will send data from the iterable to different processes. By default it will send one element at a time, but this can be changed with the chunksize
parameter.
A similar method called .map_async()
usually performs better in parallel, but in that case one has to fetch the results using a .get()
method of the returned value of .map_async()
(which is an instance of the class AsyncResult
).
In [50]:
%%file mp.py
import multiprocessing
import os
def task(args):
print "Running process", os.getpid(), "with args", args
return os.getpid(), args
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = pool.map_async(task, [1,2,3,4])
print(result.get())
In [51]:
run('mp.py')